package cn.wolfcode.mq.listener;

import cn.wolfcode.common.domain.UserInfo;
import cn.wolfcode.common.exception.BusinessException;
import cn.wolfcode.common.web.CodeMsg;
import cn.wolfcode.common.web.Result;
import cn.wolfcode.domain.SeckillProductVo;
import cn.wolfcode.mq.MQConstant;
import cn.wolfcode.mq.OrderMQResult;
import cn.wolfcode.mq.OrderMessage;
import cn.wolfcode.mq.OrderTimeout;
import cn.wolfcode.service.IOrderInfoService;
import cn.wolfcode.service.ISeckillProductService;
import cn.wolfcode.web.msg.SeckillCodeMsg;
import com.alibaba.fastjson.JSON;
import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Component;

@RocketMQMessageListener(
        consumerGroup = MQConstant.ORDER_PENDING_CONSUMER_GROUP,
        topic = MQConstant.ORDER_PENDING_TOPIC
)
@Component
@Slf4j
public class OrderPendingMQMessageListener implements RocketMQListener<OrderMessage> {

    @Autowired
    private IOrderInfoService orderInfoService;

    @Autowired
    private ISeckillProductService seckillProductService;
    @Autowired
    private RocketMQTemplate rocketMQTemplate;

    @Override
    public void onMessage(OrderMessage message) {
        log.info("[创建订单] 收到创建订单消息：{}", JSON.toJSONString(message));

        // 创建订单结果对象
        OrderMQResult result = new OrderMQResult("", "订单创建成功", 200, message.getToken());
        String orderNo = "";

        try {
            SeckillProductVo vo = seckillProductService.selectByIdAndTime(message.getSeckillId(), message.getTime());
            // 创建订单
            UserInfo userInfo = new UserInfo();
            userInfo.setPhone(message.getUserPhone());
            orderNo = orderInfoService.doSeckill(userInfo, vo);
            // 设置订单编号到订单结果对象
            result.setOrderNo(orderNo);
        } catch (BusinessException e) {
            log.warn("[创建订单] 订单创建失败：{}", e.getMessage());
            CodeMsg codeMsg = e.getCodeMsg();
            result.setMsg(codeMsg.getMsg());
            result.setCode(codeMsg.getCode());
        } catch (Exception e) {
            log.error("[创建订单] 订单创建失败：{}", e.getMessage());
            // 回补 Redis 令牌以及取消用户下单标记
            result.setMsg(SeckillCodeMsg.SECKILL_ERROR.getMsg());
            result.setCode(SeckillCodeMsg.SECKILL_ERROR.getCode());
        }

        // 发送订单创建结果消息
        SendResult send = rocketMQTemplate.syncSend(MQConstant.ORDER_RESULT_TOPIC, result);
        log.info("[订单结果] 发送创建订单结果消息: msgId={}, msg={}", send.getMsgId(), JSON.toJSONString(result));

        // 对下单失败的订单进行额外处理
        if (result.getCode() != Result.SUCCESS_CODE) {
            // 令牌记录, 用户下单记录, 本地标识
            orderInfoService.createOrderFallbackRedis(message.getSeckillId(), message.getTime(), message.getUserPhone());
        } else {
            log.info("[延迟消息] 订单创建成功, 发送延迟消息: orderNo={}", orderNo);
            // 发送延迟消息, 检查订单是否支付成功
            // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
            OrderTimeout timeout = new OrderTimeout(message.getTime(), message.getSeckillId(), message.getUserPhone(), orderNo);
            Message<OrderTimeout> msg = MessageBuilder.withPayload(timeout).build();
            rocketMQTemplate.syncSend(MQConstant.ORDER_PAY_TIMEOUT_TOPIC, msg, 2000, MQConstant.ORDER_PAY_TIMEOUT_DELAY_LEVEL);
        }
    }
}
